package com.shujia.core

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object Demo5ProcessingTime {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
      * 统计最近15秒单词的数量每隔5秒统计一次
      *
      */

    val linesDS: DataStream[String] = env.socketTextStream("master", 8888)

    val kvDS: DataStream[(String, Int)] = linesDS.flatMap(_.split(",")).map((_, 1))

    val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)

    /**
      * 处理时间：数据在flink中被处理的时间，这个时间和现实的时间同步
      *
      */

    val countDS: DataStream[(String, Int)] = keyByDS
      //滑动窗口
      .window(SlidingProcessingTimeWindows.of(Time.seconds(15), Time.seconds(5)))
      .sum(1)

    countDS.print()

    env.execute()
  }

}
